package me.zingon.open.bigsmart.thread;

import lombok.extern.slf4j.Slf4j;
import me.zingon.open.bigsmart.constant.JOB_STATUS;
import me.zingon.open.bigsmart.queue.DelayJobBucket;
import me.zingon.open.bigsmart.queue.JobPoll;
import me.zingon.open.bigsmart.queue.ReadyQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;

/**
 * @author ztc 1423047407@qq.com
 * @version 1.0
 * @date 2020-8-6 13:22
 */
@Component
@Slf4j
public class BucketTimer {

    @Autowired
    DelayJobBucket delayJobBucket;

    @Autowired
    ReadyQueue readyQueue;

    @Autowired
    JobPoll jobPoll;

    private ConcurrentSkipListSet<String> topics = new ConcurrentSkipListSet<>();

    @Async("bucketTimerExecutor")
    public void start(String topic) {
        if (topics.add(topic)) {
            this.run(topic);
        }
    }

    public void run(String topic) {
        log.info("启动 {} bucketTimer 线程", topic);
        for (; ; ) {
            try {
                Set<String> ids = delayJobBucket.getReadyJob(topic);
                if (ids.size() == 0) {
                    sleep(999L);
                    continue;
                }
                ids.stream().forEach(id -> {
                    delayJobBucket.rm(topic, id);
                    jobPoll.changeStatus(topic,id, JOB_STATUS.READY);
                });

                readyQueue.add(topic, ids);

                log.debug("{} bucketTimer 处理 {} 个 ", topic, ids.size());

                sleep(999L);
            } catch (Exception e) {
                log.error("{} bucketTimer 执行异常: {}", topic, e.getMessage());
                // TODO: 2020-8-6 删除readyQueue中此次入队数据
                break;
            }
        }
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            log.error("BucketTimer睡眠失败。", e);
        }
    }

}
